refactor: adopt mixin chain + emit per-phase spans#49
Merged
Conversation
- OmniExecutorBase inherits (InferenceMixin, Executor) so the four
omni executors pick up GovernanceMixin / DataMixin / InferenceMixin
from the same chain the inference and training executors use.
- Each concrete omni executor wraps run() with self._task_span(...)
so a 'task' root span is emitted with executor.name + workflow_id.
- Inside run(), three per-phase compute spans are added — 'model
load' (_ensure_omni), 'generation' (the omni.generate call(s)),
and 'output postprocessing' (artifact save + items build) — mirroring
the vllm executor's tracing shape.
- New tests/worker/test_omni_executor_inheritance.py asserts the
full mixin chain on each omni executor class as a compile-time
guard against regression.
Live e2e on a single GPU worker against all four omni templates
(omni_text2{speech,image,audio,general}.yaml): each task reports
ok=True with the expected artifact, and spans.jsonl contains the
'task' root plus 'model load' / 'generation' / 'output postprocessing'
sub-spans.
Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
…spec Replace the executor-local collect_text_inputs helper with the mixin's _collect_prompts_for_spec. Each omni executor now narrows PromptInput to str inline and raises ExecutionError if any item is not a string. Templates adopt the canonical data.type: list / items shape. Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
Move maybe_upload_artifacts out of the task span and add a missing maybe_upload_traces call right after, matching the vllm and diffusers pattern. Without the trace upload, omni span JSONL stayed on remote workers and never reached the server's /traces endpoint in HTTP mode. Also extract _run_inner in the image and speech executors so the post-span fall-through is the same shape across all four. Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
47dd136 to
188e305
Compare
kaiitunnz
requested changes
May 15, 2026
Collaborator
kaiitunnz
left a comment
There was a problem hiding this comment.
Minor comments. Additional consideration:
Another cleanup you should consider is to move the run method from the Omni executor classes to OmniExecutorBase, define the _run_inner method as an abstract method whose spec parameter is of type TaskSpecStrictBase., and define a class attribute _TASK_SPEC_TYPE. In this way, OmniExecutorBase can call self.require_spec(task, self._TASK_SPEC_TYPE) inside the generic self.run.
A drawback of this approach is that you need to call assert isinstance(spec, <spec-type>) as the first line of every concrete _run_inner.
Each concrete omni executor's run() did the same five things: resolve spec, dump dict, normalize out_dir, run the task span, upload artifacts and traces. Move that boilerplate to OmniExecutorBase.run() and let subclasses contribute via a _TASK_SPEC_TYPE class attribute plus an abstract _run_inner whose first line is `assert isinstance(spec, ...)` to recover the concrete type. Also adopt the cast(list[str], raw_prompts) form for the prompt-string narrowing in all four executors so the pattern reads identically. Signed-off-by: Zhengyuan Su <su.zhengyuan@u.nus.edu>
Signed-off-by: Noppanat Wadlom <noppanat.wad@gmail.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Purpose
The four omni executors predated the executor mixin chain (
InferenceMixin→DataMixin→GovernanceMixin) — the last family still on a plain-Executorbase. This PR moves them onto the chain so they emit OTel traces like the inference and training executors, adds three per-phase spans inside eachrun(), routes prompts through the mixin's data path, and uploads spans to the server's/tracesendpoint so HTTP-destination workers don't strand their trace JSONL on the worker filesystem. RFC #48 omni item.Changes
omni_executor_base.py:OmniExecutorBaseinherits(InferenceMixin, Executor). The executor-localcollect_text_inputshelper is removed; prompts now come fromDataMixin._collect_prompts_for_spec, with each call site narrowingPromptInput → strinline and raisingExecutionErrorif any item is not astr(omni executors don't consume the chat-message form).omni_text2{image,speech,audio,general}_executor.py: eachrun()wrapsself._run_inner(...)inself._task_span(...), then callsmaybe_upload_artifacts(...)andmaybe_upload_traces(...)after the span exits — matches thevllm_executor/diffusers_executorshape. Doing the uploads after__exit__is required so the root task span row is flushed intospans.jsonlbefore the trace upload reads it. Withoutmaybe_upload_traces, omni span files stayed on the worker filesystem in HTTP-destination deployments and never reached the server's/api/v1/traces/workflows/{wfl}/spansendpoint.SpanType.COMPUTEsub-spans inside the task span:model load(_ensure_omni),generation(theomni.generateloop / streaming generator),output postprocessing(artifact save loop). Attributes carryprompt_count,item_count,flowmesh.type=compute.examples/templates/omni_text2{speech,audio,general}.yaml: migrated fromdata.text: "..."to the canonicaldata.type: list/items: [...]mixin shape.tests/worker/test_omni_executor_inheritance.py: parametrized over the four executor classes + the base; asserts each is a subclass ofInferenceMixin,DataMixin,GovernanceMixin. Catches future regressions of the base class hierarchy.Test Plan
uv run pytest tests/worker/test_omni_executor_inheritance.py tests/server tests/shared tests/sdk tests/cli— 537 passed, mypy clean across the touched files.ok=True, expected artifact on disk (generated_tts.wav/generated_image_*.png/bgm.wav/narration.wav), spans includetask,model load,generation,output postprocessing, prompts threaded throughDataMixin._collect_prompts_for_spec.Test Result
537 unit tests passed; mypy clean.
Live e2e (1 GPU worker, omni-mixins images):
Cold weights dominate;
generationsecond; postprocessing negligible. Sub-spans sum to within 20 ms of the root in every case. Image, audio, general re-validated on the post-mixin-migration commit; results match the table above. Trace-upload fix is behavior-only for HTTP-destination workers and doesn't change wall-clock; local-stack mode reads spans off the shared docker volume regardless.Pre-submission Checklist
pre-commit run --all-filesand fixed any issues.uv run pytest tests/passes locally.uv sync --all-packages --group ci --frozen).[BREAKING]and described migration steps above.